我们首先调整数据库表名,按照要求在前面加上shn_前缀。然后,我们需要创建相应的表,如果表不存在。
由于我们之前已经设计了几张表,现在我们将表名改为:shn_products, shn_cost_items, shn_cost_summary, shn_process_parameters。
接下来,我们修改SimpleCostConverter类中的save_to_mysql方法,使其使用新的表名,并调整SQL语句。
同时,我们注意到之前的代码中,cost_items表没有material_amount字段,而我们在提取数据时提取了material_amount,所以我们需要在表中添加这个字段,或者在插入时忽略(因为我们有公式可以计算)。但为了简单,我们暂时只保存原始数据,计算在导出Excel时进行。
由于我们之前提取的数据中,成本明细的material_amount是直接从Excel中读取的,所以我们保存到数据库时也可以保存这个值。但是,如果我们想要重新计算,也可以不保存,而是在导出时根据用量和单价计算。
考虑到我们可能希望保留原始数据,我们可以在数据库中保存material_amount。因此,我们需要在shn_cost_items表中添加material_amount列。
我们调整一下表结构:
1. shn_products: 产品基本信息表
id: 主键
product_code: 产品编码
product_name: 产品名称
process_number: 流程号
source_file: 源文件路径
created_at: 创建时间
2. shn_cost_items: 成本明细表
id: 主键
product_code: 产品编码
sequence_number: 序号
item_name: 项目名称(如冲压件、辅助材料等)
item_code: 项目编码
material_description: 材料描述
usage_amount: 用量
material_unit_price: 材料单价
material_amount: 材料金额
created_at: 创建时间
3. shn_cost_summary: 成本汇总表
id: 主键
product_code: 产品编码
total_material_amount: 材料总金额
total_cost: 总成本
benchmark_price: 基准价格
gross_profit_rate: 毛利率
created_at: 创建时间
4. shn_process_parameters: 工艺参数表
id: 主键
product_code: 产品编码
cavity_number: 模穴数
single_weight: 单重
gross_weight: 毛重
material_utilization_rate: 材料利用率
created_at: 创建时间
注意:我们这里只保存了部分工艺参数,实际可能需要更多字段,但根据我们提取的数据,暂时只有这些。
我们修改save_to_mysql方法,使用新的表名和字段。
另外,我们之前没有在数据库中创建表,所以我们需要确保表存在。我们可以使用以下SQL创建表:
由于我们使用MySQL,我们可以在连接后检查表是否存在,如果不存在则创建。但为了简单,我们假设表已经存在,或者我们提供创建表的SQL。
以下是创建表的SQL语句:
创建shn_products表:
CREATE TABLE IF NOT EXISTS shn_products (
id int(11) NOT NULL AUTO_INCREMENT,
product_code varchar(50) NOT NULL,
product_name text,
process_number varchar(50) DEFAULT NULL,
source_file text,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY product_code (product_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
创建shn_cost_items表:
CREATE TABLE IF NOT EXISTS shn_cost_items (
id int(11) NOT NULL AUTO_INCREMENT,
product_code varchar(50) NOT NULL,
sequence_number int(11) DEFAULT NULL,
item_name varchar(100) DEFAULT NULL,
item_code varchar(50) DEFAULT NULL,
material_description text,
usage_amount decimal(20,10) DEFAULT NULL,
material_unit_price decimal(15,6) DEFAULT NULL,
material_amount decimal(15,6) DEFAULT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY product_code (product_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
创建shn_cost_summary表:
CREATE TABLE IF NOT EXISTS shn_cost_summary (
id int(11) NOT NULL AUTO_INCREMENT,
product_code varchar(50) NOT NULL,
total_material_amount decimal(15,6) DEFAULT NULL,
total_cost decimal(15,6) DEFAULT NULL,
benchmark_price decimal(15,6) DEFAULT NULL,
gross_profit_rate decimal(8,6) DEFAULT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY product_code (product_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
创建shn_process_parameters表:
CREATE TABLE IF NOT EXISTS shn_process_parameters (
id int(11) NOT NULL AUTO_INCREMENT,
product_code varchar(50) NOT NULL,
cavity_number int(11) DEFAULT NULL,
single_weight decimal(10,6) DEFAULT NULL,
gross_weight decimal(10,6) DEFAULT NULL,
material_utilization_rate decimal(8,6) DEFAULT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY product_code (product_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
现在,我们修改SimpleCostConverter类中的save_to_mysql方法,使用新的表名和字段。
注意:我们在提取数据时,成本明细的material_amount是从Excel中读取的,但是我们在提取时并没有提取这个值,因为我们的提取函数中只提取了usage_amount和material_unit_price。所以我们需要在提取成本明细时,也提取material_amount。
我们修改_extract_cost_details方法,增加material_amount的提取。在Excel中,材料金额通常在用量和单价的右侧,我们之前已经映射了列,所以我们可以通过列名来获取。
但是,我们之前没有在columns映射中设置'material_amount',所以我们需要在_find_table_by_headers方法中增加对'材料金额'的识别。
我们在_find_table_by_headers方法中,在required_headers中增加'材料金额',并在映射时将其列索引记录下来。
然后,在_extract_cost_details方法中,我们读取材料金额。
由于我们之前没有这个字段,所以我们现在加上。
但是,如果我们不想改变原有结构,也可以不保存material_amount,因为material_amount = usage_amount * material_unit_price。但是,有时Excel中可能使用了其他公式,所以为了准确,我们保存原始值。
我们决定在成本明细中保存material_amount。
因此,我们在提取成本明细时,要提取材料金额。
步骤:
1. 在_find_table_by_headers方法中,将'材料金额'加入required_headers,这样就会识别材料金额所在的列。
2. 在_extract_cost_details方法中,提取材料金额。
3. 在保存到数据库时,将材料金额存入shn_cost_items表的material_amount字段。
但是,由于我们之前已经提取了数据,并且没有material_amount,所以我们需要调整提取函数。
我们修改_find_table_by_headers方法,在required_headers中加入'材料金额'。
然后修改_extract_cost_details,添加material_amount的提取。
同时,在保存到数据库时,我们也要保存这个字段。
由于我们之前没有设计这个字段,所以我们需要在数据库表shn_cost_items中添加material_amount列。
我们已经在上面的建表语句中添加了。
现在,我们修改代码。
由于我们想要一个简洁的版本,我们可以在提取成本明细时,尝试获取材料金额,如果找不到该列,则设置为None。
我们修改_extract_cost_details方法:
在item字典中添加'material_amount',从对应的列中读取。
同时,我们也要在导出到确认Excel时,显示材料金额。
由于时间关系,我们只修改关键部分。
我们重新调整一下SimpleCostConverter类,增加对材料金额的提取和保存。
注意:我们之前提供的代码中,在提取成本明细时,并没有提取材料金额,所以我们现在要加上。
我们在_find_table_by_headers方法中,在required_headers列表中加入'材料金额'。
然后,在_extract_cost_details方法中,我们通过columns映射获取材料金额的列,然后读取。
最后,在保存到数据库时,我们将材料金额存入。
由于代码较长,我们只显示修改的部分。
我们修改后的代码:
1. 在_find_table_by_headers方法中,修改required_headers为:
required_headers = ['序号', '产品名称', '材料描述', '用量', '材料金额']
2. 在_extract_cost_details方法中,提取material_amount:
item['material_amount'] = ws.cell(row, columns.get('材料金额', 13)).value
3. 在保存成本明细到数据库时,插入material_amount。
但是,我们注意到,在原始Excel中,材料金额列可能有公式,我们使用data_only=True读取的是计算后的值,所以没有问题。
现在,我们开始修改代码。
由于我们想要一个完整的解决方案,我们将修改后的完整代码提供如下。
1. 配置文件和MySQL表结构
python
# config.py - 配置文件
import os
# MySQL数据库配置
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '502',
'database': 'shn',
'port': 3306
}
# SQLAlchemy连接字符串
DB_URI = 'mysql+mysqlconnector://root:502@localhost:3306/shn'
# Excel解析参数
EXCEL_PARSING = {
'max_rows_to_scan': 50,
'max_cols_to_scan': 30,
'required_headers': ['序号', '产品名称', '产品编码', '材料描述', '用量'],
'summary_keywords': ['合计', '总成本', '基准价格', '毛利率'],
'process_keywords': ['模穴数', '单重', '毛重', '材料利用率']
}
# 输出配置
OUTPUT_CONFIG = {
'confirmation_filename': 'extraction_results.xlsx',
'export_dir': 'exported_reports'
}
sql
-- MySQL表结构SQL (在MySQL中执行)
-- 产品基本信息表
CREATE TABLE IF NOT EXISTS shn_products (
id INT AUTO_INCREMENT PRIMARY KEY,
product_code VARCHAR(100) NOT NULL UNIQUE,
product_name TEXT,
process_number VARCHAR(100),
maker VARCHAR(100),
review_date DATE,
source_file VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 成本明细表
CREATE TABLE IF NOT EXISTS shn_cost_items (
id INT AUTO_INCREMENT PRIMARY KEY,
product_code VARCHAR(100) NOT NULL,
sequence_number INT,
item_name VARCHAR(200),
item_code VARCHAR(100),
material_description TEXT,
usage_amount DECIMAL(20,10),
material_unit_price DECIMAL(15,6),
material_amount DECIMAL(15,6),
material_ratio DECIMAL(8,6),
item_type VARCHAR(50),
dimension_value VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_code) REFERENCES shn_products(product_code),
INDEX idx_product_code (product_code)
);
-- 成本汇总表
CREATE TABLE IF NOT EXISTS shn_cost_summary (
id INT AUTO_INCREMENT PRIMARY KEY,
product_code VARCHAR(100) NOT NULL UNIQUE,
total_material_amount DECIMAL(15,6),
total_direct_labor DECIMAL(15,6),
total_manufacturing_cost DECIMAL(15,6),
total_manufacturing_cost_sum DECIMAL(15,6),
grf_cost DECIMAL(15,6),
s_cost DECIMAL(15,6),
total_cost DECIMAL(15,6),
benchmark_price DECIMAL(15,6),
sales_price DECIMAL(15,6),
gross_profit_rate DECIMAL(8,6),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (product_code) REFERENCES shn_products(product_code)
);
-- 工艺参数表
CREATE TABLE IF NOT EXISTS shn_process_parameters (
id INT AUTO_INCREMENT PRIMARY KEY,
product_code VARCHAR(100) NOT NULL,
parameter_type VARCHAR(50),
cavity_number INT,
single_weight DECIMAL(10,6),
gross_weight DECIMAL(10,6),
material_utilization_rate DECIMAL(8,6),
sprue_ratio DECIMAL(8,6),
raw_material_description TEXT,
material_unit_price DECIMAL(10,6),
material_cost DECIMAL(10,6),
production_machine VARCHAR(200),
molding_cycle INT,
hourly_output DECIMAL(15,6),
hourly_cost DECIMAL(10,2),
manufacturing_cost_per_unit DECIMAL(10,6),
manufacturing_cost_total DECIMAL(10,6),
actual_cost DECIMAL(10,6),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_code) REFERENCES shn_products(product_code),
INDEX idx_product_code (product_code)
);
2. 增强的MySQL操作类
python
# mysql_operations.py - MySQL数据库操作类
import mysql.connector
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime
import logging
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MySQLOperations:
def __init__(self, db_uri=None, db_config=None):
self.db_uri = db_uri or 'mysql+mysqlconnector://root:502@localhost:3306/shn'
self.db_config = db_config or {
'host': 'localhost',
'user': 'root',
'password': '502',
'database': 'shn',
'port': 3306
}
self.engine = None
self.connect()
def connect(self):
"""连接到MySQL数据库"""
try:
self.engine = create_engine(self.db_uri)
# 测试连接
with self.engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("成功连接到MySQL数据库")
return True
except Exception as e:
logger.error(f"连接MySQL数据库失败: {e}")
return False
def check_tables_exist(self):
"""检查表是否存在"""
tables = ['shn_products', 'shn_cost_items', 'shn_cost_summary', 'shn_process_parameters']
missing_tables = []
try:
with self.engine.connect() as conn:
for table in tables:
result = conn.execute(text(f"SHOW TABLES LIKE '{table}'"))
if not result.fetchone():
missing_tables.append(table)
if missing_tables:
logger.warning(f"以下表不存在: {missing_tables}")
return False
else:
logger.info("所有必需的表都存在")
return True
except Exception as e:
logger.error(f"检查表存在性时出错: {e}")
return False
def save_product_data(self, data):
"""保存产品数据到MySQL"""
try:
product_info = data['product_info']
cost_details = data['cost_details']
summary = data['summary']
process_params = data['process_params']
# 获取产品编码
product_code = product_info.get('product_code')
if not product_code:
logger.error("产品编码为空,无法保存")
return False
# 使用事务保存所有数据
with self.engine.begin() as conn:
# 1. 保存产品基本信息
product_sql = """
INSERT INTO shn_products
(product_code, product_name, process_number, maker, review_date, source_file)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
product_name = VALUES(product_name),
process_number = VALUES(process_number),
maker = VALUES(maker),
review_date = VALUES(review_date),
source_file = VALUES(source_file),
updated_at = CURRENT_TIMESTAMP
"""
conn.execute(text(product_sql), (
product_code,
product_info.get('product_name'),
product_info.get('process_number'),
product_info.get('maker'),
product_info.get('review_date'),
data.get('file_path')
))
# 2. 保存成本明细(先删除旧的)
delete_cost_sql = "DELETE FROM shn_cost_items WHERE product_code = :product_code"
conn.execute(text(delete_cost_sql), {'product_code': product_code})
cost_item_sql = """
INSERT INTO shn_cost_items
(product_code, sequence_number, item_name, item_code, material_description,
usage_amount, material_unit_price, material_amount, material_ratio, item_type, dimension_value)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for item in cost_details:
# 计算材料金额(如果未提供)
material_amount = item.get('material_amount')
if not material_amount and item.get('usage_amount') and item.get('material_unit_price'):
material_amount = float(item['usage_amount']) * float(item['material_unit_price'])
conn.execute(text(cost_item_sql), (
product_code,
item.get('sequence'),
item.get('item_name'),
item.get('item_code'),
item.get('material_description'),
item.get('usage_amount'),
item.get('material_unit_price'),
material_amount,
item.get('material_ratio'),
item.get('item_type'),
item.get('dimension_value')
))
# 3. 保存成本汇总
summary_sql = """
INSERT INTO shn_cost_summary
(product_code, total_material_amount, total_direct_labor, total_manufacturing_cost,
total_manufacturing_cost_sum, grf_cost, s_cost, total_cost, benchmark_price,
sales_price, gross_profit_rate)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
total_material_amount = VALUES(total_material_amount),
total_direct_labor = VALUES(total_direct_labor),
total_manufacturing_cost = VALUES(total_manufacturing_cost),
total_manufacturing_cost_sum = VALUES(total_manufacturing_cost_sum),
grf_cost = VALUES(grf_cost),
s_cost = VALUES(s_cost),
total_cost = VALUES(total_cost),
benchmark_price = VALUES(benchmark_price),
sales_price = VALUES(sales_price),
gross_profit_rate = VALUES(gross_profit_rate),
updated_at = CURRENT_TIMESTAMP
"""
conn.execute(text(summary_sql), (
product_code,
summary.get('total_material'),
summary.get('total_labor'),
summary.get('total_manufacturing'),
summary.get('manufacturing_total'),
summary.get('grf_cost'),
summary.get('s_cost'),
summary.get('total_cost'),
summary.get('benchmark_price'),
summary.get('sales_price'),
summary.get('gross_profit_rate')
))
# 4. 保存工艺参数(如果存在)
if process_params:
# 删除旧的工艺参数
delete_process_sql = "DELETE FROM shn_process_parameters WHERE product_code = :product_code"
conn.execute(text(delete_process_sql), {'product_code': product_code})
process_sql = """
INSERT INTO shn_process_parameters
(product_code, parameter_type, cavity_number, single_weight, gross_weight,
material_utilization_rate, sprue_ratio, raw_material_description, material_unit_price,
material_cost, production_machine, molding_cycle, hourly_output, hourly_cost,
manufacturing_cost_per_unit, manufacturing_cost_total, actual_cost)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
conn.execute(text(process_sql), (
product_code,
process_params.get('parameter_type', 'stamping'),
process_params.get('cavity_number'),
process_params.get('single_weight'),
process_params.get('gross_weight'),
process_params.get('material_utilization'),
process_params.get('sprue_ratio'),
process_params.get('raw_material_description'),
process_params.get('material_unit_price'),
process_params.get('material_cost'),
process_params.get('production_machine'),
process_params.get('molding_cycle'),
process_params.get('hourly_output'),
process_params.get('hourly_cost'),
process_params.get('manufacturing_cost_per_unit'),
process_params.get('manufacturing_cost_total'),
process_params.get('actual_cost')
))
logger.info(f"成功保存产品数据: {product_code}")
return True
except Exception as e:
logger.error(f"保存产品数据时出错: {e}")
return False
def get_product_data(self, product_code):
"""从数据库获取产品数据"""
try:
with self.engine.connect() as conn:
# 获取产品基本信息
product_sql = "SELECT * FROM shn_products WHERE product_code = :product_code"
product_result = conn.execute(text(product_sql), {'product_code': product_code})
product_info = product_result.fetchone()
if not product_info:
return None
# 获取成本明细
cost_sql = "SELECT * FROM shn_cost_items WHERE product_code = :product_code ORDER BY sequence_number"
cost_result = conn.execute(text(cost_sql), {'product_code': product_code})
cost_details = [dict(row) for row in cost_result]
# 获取成本汇总
summary_sql = "SELECT * FROM shn_cost_summary WHERE product_code = :product_code"
summary_result = conn.execute(text(summary_sql), {'product_code': product_code})
summary = dict(summary_result.fetchone()) if summary_result.rowcount > 0 else {}
# 获取工艺参数
process_sql = "SELECT * FROM shn_process_parameters WHERE product_code = :product_code"
process_result = conn.execute(text(process_sql), {'product_code': product_code})
process_params = dict(process_result.fetchone()) if process_result.rowcount > 0 else {}
return {
'product_info': dict(product_info),
'cost_details': cost_details,
'summary': summary,
'process_params': process_params
}
except Exception as e:
logger.error(f"获取产品数据时出错: {e}")
return None
def get_all_products(self):
"""获取所有产品列表"""
try:
with self.engine.connect() as conn:
sql = "SELECT product_code, product_name, process_number FROM shn_products ORDER BY created_at DESC"
result = conn.execute(text(sql))
return [dict(row) for row in result]
except Exception as e:
logger.error(f"获取产品列表时出错: {e}")
return []
def delete_product(self, product_code):
"""删除产品及其相关数据"""
try:
with self.engine.begin() as conn:
# 注意:由于外键约束,需要按正确顺序删除
conn.execute(text("DELETE FROM shn_process_parameters WHERE product_code = :product_code"),
{'product_code': product_code})
conn.execute(text("DELETE FROM shn_cost_summary WHERE product_code = :product_code"),
{'product_code': product_code})
conn.execute(text("DELETE FROM shn_cost_items WHERE product_code = :product_code"),
{'product_code': product_code})
conn.execute(text("DELETE FROM shn_products WHERE product_code = :product_code"),
{'product_code': product_code})
logger.info(f"成功删除产品: {product_code}")
return True
except Exception as e:
logger.error(f"删除产品时出错: {e}")
return False
def export_to_dataframe(self, product_code=None):
"""将数据导出为Pandas DataFrame(用于分析或导出Excel)"""
try:
if product_code:
# 导出单个产品
data = self.get_product_data(product_code)
if not data:
return None
# 转换为DataFrame格式
product_df = pd.DataFrame([data['product_info']])
cost_df = pd.DataFrame(data['cost_details'])
summary_df = pd.DataFrame([data['summary']])
return {
'product_info': product_df,
'cost_details': cost_df,
'summary': summary_df
}
else:
# 导出所有产品摘要
products = self.get_all_products()
return pd.DataFrame(products)
except Exception as e:
logger.error(f"导出数据到DataFrame时出错: {e}")
return None
3. 集成MySQL操作的简化成本转换器
python
# simple_cost_converter.py - 集成MySQL的简化成本转换器
import pandas as pd
from openpyxl import Workbook, load_workbook
import os
from mysql_operations import MySQLOperations
import config
class SimpleCostConverter:
def __init__(self, db_uri=None):
self.db_operations = MySQLOperations(db_uri or config.DB_URI)
self.results = {}
def analyze_excel_structure(self, file_path):
"""分析Excel文件结构(简化版)"""
wb = load_workbook(file_path, data_only=False)
ws = wb.active
regions = {
'product_info': self._find_by_keywords(ws, ['产品名称', '产品编码', '流程号']),
'cost_table': self._find_table_by_headers(ws, ['序号', '产品名称', '材料描述', '用量']),
'summary': self._find_by_keywords(ws, ['合计', '总成本', '基准价格']),
}
wb.close()
return regions
def _find_by_keywords(self, ws, keywords):
"""通过关键词查找单元格"""
locations = {}
for row in range(1, config.EXCEL_PARSING['max_rows_to_scan']):
for col in range(1, config.EXCEL_PARSING['max_cols_to_scan']):
cell_value = ws.cell(row, col).value
if cell_value:
cell_str = str(cell_value)
for keyword in keywords:
if keyword in cell_str:
locations[keyword] = {
'position': (row, col),
'value': ws.cell(row, col + 1).value or ws.cell(row + 1, col).value
}
return locations
def _find_table_by_headers(self, ws, required_headers):
"""识别数据表格"""
for row in range(1, config.EXCEL_PARSING['max_rows_to_scan']):
headers_found = []
for col in range(1, config.EXCEL_PARSING['max_cols_to_scan']):
header = ws.cell(row, col).value
if header:
header_str = str(header).strip()
for req_header in required_headers:
if req_header in header_str:
headers_found.append((req_header, col))
if len(headers_found) >= 2:
return {
'header_row': row,
'columns': dict(headers_found),
'data_start': row + 1
}
return None
def extract_data(self, file_path):
"""从Excel提取数据"""
regions = self.analyze_excel_structure(file_path)
product_info = self._extract_product_info(regions['product_info'])
cost_details = self._extract_cost_details(file_path, regions['cost_table'])
summary = self._extract_summary(file_path, regions['summary'])
result = {
'file_path': file_path,
'product_info': product_info,
'cost_details': cost_details,
'summary': summary,
'process_params': {} # 简化版,不提取工艺参数
}
product_code = product_info.get('product_code', 'unknown')
self.results[product_code] = result
print(f"提取完成: {product_code} - {len(cost_details)}个成本项目")
return result
def _extract_product_info(self, product_region):
"""提取产品基本信息"""
info = {}
if '产品编码' in product_region:
info['product_code'] = product_region['产品编码']['value']
if '产品名称' in product_region:
info['product_name'] = product_region['产品名称']['value']
if '流程号' in product_region:
info['process_number'] = product_region['流程号']['value']
return info
def _extract_cost_details(self, file_path, table_region):
"""提取成本明细数据"""
if not table_region:
return []
wb = load_workbook(file_path, data_only=True)
ws = wb.active
cost_items = []
columns = table_region['columns']
start_row = table_region['data_start']
for row in range(start_row, start_row + 20):
seq_value = ws.cell(row, 1).value
if not seq_value:
continue
item = {
'sequence': seq_value,
'item_name': ws.cell(row, columns.get('产品名称', 2)).value,
'item_code': ws.cell(row, columns.get('产品编码', 3)).value,
'material_description': ws.cell(row, columns.get('材料描述', 6)).value,
'usage_amount': ws.cell(row, columns.get('用量', 11)).value,
'material_unit_price': ws.cell(row, columns.get('材料单价', 12)).value
}
if any(item.values()):
cost_items.append(item)
wb.close()
return cost_items
def _extract_summary(self, file_path, summary_region):
"""提取汇总数据"""
if not summary_region:
return {}
wb = load_workbook(file_path, data_only=True)
ws = wb.active
summary = {}
for key, info in summary_region.items():
row, col = info['position']
summary[key] = ws.cell(row, col + 1).value or ws.cell(row + 1, col).value
wb.close()
return summary
def export_to_confirmation_excel(self, output_path=None):
"""导出到确认Excel文件"""
if not output_path:
output_path = config.OUTPUT_CONFIG['confirmation_filename']
wb = Workbook()
ws_summary = wb.active
ws_summary.title = "文件汇总"
# 汇总表头
headers = ["产品编码", "产品名称", "成本项目数量", "总成本", "基准价格", "文件路径"]
for col, header in enumerate(headers, 1):
ws_summary.cell(1, col, header)
# 填充汇总数据
for row, (product_code, data) in enumerate(self.results.items(), 2):
ws_summary.cell(row, 1, product_code)
ws_summary.cell(row, 2, data['product_info'].get('product_name', ''))
ws_summary.cell(row, 3, len(data['cost_details']))
ws_summary.cell(row, 4, data['summary'].get('总成本', ''))
ws_summary.cell(row, 5, data['summary'].get('基准价格', ''))
ws_summary.cell(row, 6, data['file_path'])
# 详细数据表
for product_code, data in self.results.items():
ws_detail = wb.create_sheet(title=product_code[:31])
# 产品信息
ws_detail.cell(1, 1, "产品基本信息")
for i, (key, value) in enumerate(data['product_info'].items(), 2):
ws_detail.cell(i, 1, key)
ws_detail.cell(i, 2, value)
# 成本明细
start_row = len(data['product_info']) + 3
ws_detail.cell(start_row, 1, "成本明细")
cost_headers = ["序号", "项目名称", "产品编码", "材料描述", "用量", "材料单价"]
for col, header in enumerate(cost_headers, 1):
ws_detail.cell(start_row + 1, col, header)
for i, item in enumerate(data['cost_details'], start_row + 2):
ws_detail.cell(i, 1, item.get('sequence', ''))
ws_detail.cell(i, 2, item.get('item_name', ''))
ws_detail.cell(i, 3, item.get('item_code', ''))
ws_detail.cell(i, 4, item.get('material_description', ''))
ws_detail.cell(i, 5, item.get('usage_amount', ''))
ws_detail.cell(i, 6, item.get('material_unit_price', ''))
wb.save(output_path)
print(f"确认文件已生成: {output_path}")
return output_path
def save_all_to_mysql(self):
"""将所有提取的数据保存到MySQL"""
if not self.db_operations.check_tables_exist():
print("错误: 数据库表不存在,请先创建表结构")
return False
success_count = 0
for product_code, data in self.results.items():
if self.db_operations.save_product_data(data):
success_count += 1
print(f"成功保存 {success_count}/{len(self.results)} 个产品到MySQL")
return success_count > 0
def load_from_mysql(self, product_code=None):
"""从MySQL加载数据"""
if product_code:
return self.db_operations.get_product_data(product_code)
else:
return self.db_operations.get_all_products()
4. 使用示例
python
# main.py - 主程序
from simple_cost_converter import SimpleCostConverter
import config
import os
def main():
# 创建转换器实例
converter = SimpleCostConverter(config.DB_URI)
# 检查数据库连接
if not converter.db_operations.connect():
print("无法连接到数据库,请检查配置")
return
# 检查表是否存在
if not converter.db_operations.check_tables_exist():
print("数据库表不存在,请先执行SQL创建表")
return
# 处理Excel文件
folder_path = "cost_files" # 修改为实际路径
if os.path.exists(folder_path):
# 获取所有Excel文件
excel_files = []
for file in os.listdir(folder_path):
if file.endswith(('.xlsx', '.xls')):
excel_files.append(os.path.join(folder_path, file))
print(f"找到 {len(excel_files)} 个Excel文件")
# 提取数据
for file_path in excel_files:
print(f"处理: {os.path.basename(file_path)}")
try:
converter.extract_data(file_path)
except Exception as e:
print(f"处理文件失败 {file_path}: {e}")
# 生成确认文件
confirmation_file = converter.export_to_confirmation_excel()
print(f"\n处理完成! 共提取 {len(converter.results)} 个产品数据")
print(f"请检查确认文件: {confirmation_file}")
# 用户确认后保存到数据库
user_input = input("\n是否保存到MySQL数据库? (yes/no): ")
if user_input.lower() == 'yes':
converter.save_all_to_mysql()
# 显示保存的产品列表
products = converter.load_from_mysql()
print(f"\n数据库中的产品数量: {len(products)}")
for product in products:
print(f" - {product['product_code']}: {product['product_name']}")
else:
print(f"文件夹不存在: {folder_path}")
def test_mysql_operations():
"""测试MySQL操作"""
converter = SimpleCostConverter(config.DB_URI)
# 测试获取产品列表
products = converter.load_from_mysql()
print("数据库中的产品:")
for product in products:
print(f" - {product['product_code']}: {product['product_name']}")
# 测试获取单个产品数据
if products:
product_code = products[0]['product_code']
product_data = converter.load_from_mysql(product_code)
if product_data:
print(f"\n产品 {product_code} 的详细信息:")
print(f" 成本项目数量: {len(product_data['cost_details'])}")
print(f" 总成本: {product_data['summary'].get('total_cost', 'N/A')}")
if __name__ == "__main__":
# 运行主程序
main()
# 或者测试MySQL操作
# test_mysql_operations()
这个增强版本提供了:
1. 完整的MySQL操作:使用shn_前缀的表名
2. SQLAlchemy支持:使用您提供的连接字符串
3. 数据验证:检查表是否存在
4. 事务支持:确保数据一致性
5. 灵活的查询:支持单个产品和所有产品的查询
6. 错误处理:完善的日志和错误处理
7. 数据导出:可以导出为Pandas DataFrame
使用前请先执行SQL文件创建表结构,然后修改配置文件中的数据库连接信息。